RabbitMQ 常用的工作模式学习
参考资料 官方文档 RabbitMQ Tutorials 参考资料 RabbitMQ的六种工作模式
配置 Maven 依赖
这篇笔记主要使用 Java 演示 RabbitMQ 的六种工作模式,所以得先配置一下环境
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
channel 信道
概念:信道是生产消费者与 rabbit 通信的渠道,生产者 publish 或是消费者 subscribe 一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接,什么意思呢?
就是说rabbitmq在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID ,保证了信道私有性,对应上唯一的线程使用。
疑问:为什么不建立多个TCP连接呢?原因是rabbit保证性能,系统为每个线程开辟一个TCP是非常消耗性能, 每秒成百上千的建立销毁TCP会严重消耗系统。所以rabbitmq选择建立多个信道(建立在tcp的虚拟连接) 连接到rabbit上。
类似概念:TCP是电缆,信道就是里面的光纤,每个光纤都是独立的,互不影响。
确认机制(ack)
1、发送方确认模式:消息发送到交换器 > 发送完毕 > 消息投递到队列或持久化到磁盘异步回调通知生产者
2、消费者确认机制:消息投递消费者 > ack > 删除该条消息 > 投递下一条
注:收到ACK前,不会把消息再次发送给该消费者,但是会把下一条消息发送给其他消费者
simple 简单模式
注意:这种模式是点对点的,所以一个生产者对应一个消费者
最简单的一个消费者和一个生产者的模式,生产者生成消息,消费者监听消息,若是消费者监听到它所需要的消息,就会消费该消息,这种消息是一次性的,被消费了就没有了。
生产数据
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 创建一个 MQ 的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 创建连接 rabbitmq 的主机(注意!!这里有个坑,不能使用 127.0.0.1,应该使用 localhost 才行)
connectionFactory.setHost("localhost");
// 设置端口号(注意不是 15672)
connectionFactory.setPort(5672);
// 设置要连接的虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码(必须使用密码登陆,所以别使用无密码的账户)
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
// 通过上面的连接工厂获取连接对象
Connection connection = connectionFactory.newConnection();
// 获取连接通道
Channel channel = connection.createChannel();
// 绑定对应的消息队列
// 参数1:队列名称(如果不存在则会自动创建)
// 参数2:设置是否持久化(true 持久化)
// 参数3:exclusive 设置是否独占
// 参数4:autoDelete 设置是否删除
// 参数5:附加参数
channel.queueDeclare("hello",false,false,false,null);
// 发布消息(注意:上面只是绑定数据到队列上)
// 参数一:交换机名称
// 参数二:发布的队列
// 参数三:属性设置(MessageProperties.PERSISTENT_TEXT_PLAIN 设置当前消息的持久化)
// 参数四:发布的具体内容(需要使用 Byte类型)
channel.basicPublish("","hello",null,"hello rabbitMQ".getBytes());
channel.close();
connectionFactory.clone();
}
注意:设置持久化时队列持久化不代表消息持久化,且生产者和消费者设置的参数必须是一样的,例如生产者设置为持久化,消费者也必须加上持久化这个属性
消费数据
注意:这个不能使用 Junit,因为当执行到回调函数之前单元测试的线程就结束了,所以应该放在 main 函数里面执行
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 绑定对应的消息队列
channel.queueDeclare("hello", false, false, false, null);
// 消费消息
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
// 注意:因为那个回调函数是 “异步” 的,所以这里就不用关闭通道
抽象成工具类
因为每次都要重复的编写这个连接,所以将其抽象成工具类
public class RabbitMQConnection {
private RabbitMQConnection(){}
private static final ConnectionFactory connectionFactory;
static {
// 创建一个 MQ 的连接工厂对象
connectionFactory = new ConnectionFactory();
// 创建连接 rabbitmq 的主机
connectionFactory.setHost("localhost");
// 设置端口号
connectionFactory.setPort(5672);
// 设置要连接的虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码(因为没有设置密码,所以直接略过)
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
}
public static Connection getConnection() throws IOException, TimeoutException {
// 通过上面的连接工厂获取连接对象
return connectionFactory.newConnection();
}
// 关闭
public static void closeConnectionAndChannel(Channel channel,Connection connection) throws IOException, TimeoutException {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
工作模型(Work Queue)
如图,这种模型一个队列对应多个消费者,队列中的消息一旦被消费就会消失,确保了任务不会被重复执行
注意:Rabbit 队列默认情况下是平均分配消息的,例如两个消费者,就对半分这些消息,所以就有一个问题,慢的和快的消费者消费速度不同,但是分配的消息却是相同的,造成了性能的浪费(速度快的消费完消息,而慢的还在消费)
为了解决这个问题:需要设置一次只能消费一条数据,且关闭消费者的自动确认机制(就是收到消息自动删除队列里的消息,而不去考虑是否真的消费完成)
// 设置为一次只能消费一条数据
channel.basicQos(1);
// 使用第二个参数关闭它就好了
channel.basicConsume("work",false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1" + new String(body));
// 手动确认消息已经消费
// 参数一:确认的消息标签(确认队列中某个具体的消息) 参数二:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
生产者
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("","work",null,("number" + i + "work Queue").getBytes());
}
RabbitMQConnection.closeConnectionAndChannel(channel,connection);
}
}
消费者01
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
// 设置为一次只能消费一条数据
channel.basicQos(1);
// 使用第二个参数关闭它就好了
channel.basicConsume("work",false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1" + new String(body));
// 手动确认消息已经消费
// 参数一:确认的消息标签(确认队列中某个具体的消息) 参数二:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
消费者02
public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicQos(1);
channel.basicConsume("work",false, new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 这里做个延迟
Thread.sleep(10);
System.out.println("消费者-2" + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
发布订阅模型
又称 “广播” 模型(fanout)
特点:
- 可以多消费者
- 每个消费者都有自己的队列
- 每个队列都要绑定到交换机上
- 交换机把消息发送给绑定过的所有队列(广播)
如下:生产者发送一条消息,下面绑定了该交换机的消费者都能收到这条消息
生产者
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
// 参数一:交换机名称 参数二:交换机类型(这里的 fanout 是广播)
channel.exchangeDeclare("test","fanout");
// 发送消息到交换机
channel.basicPublish("test","",null,"test fanout exchange message!".getBytes());
RabbitMQConnection.closeConnectionAndChannel(channel,connection);
}
}
消费者01
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare("test","fanout");
// 创建一个临时的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和临时队列
channel.queueBind(queueName,"test","");
// 消费消息
channel.basicConsume(queueName,true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1" + new String(body));
}
});
}
}
消费者02
public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare("test","fanout");
// 创建一个临时的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和临时队列
channel.queueBind(queueName,"test","");
// 消费消息
channel.basicConsume(queueName,true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-2" + new String(body));
}
});
}
}
路由模型-直连(Direct)
在 Fanout 模型中,一条消息会被所有订阅的队列消费。如果希望不同的消息被不同的队列消费就要使用到 Direct 类型的 Exchange
在这种直连模型下:
- 队列与交换机绑定需要加上一个 RoutingKey
- 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
- Exchange 不再把消息交给每一个绑定的队列,而是根据 RoutingKey 来判断
生产者
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");
// 发送消息
String routingKey = "info";
channel.basicPublish("logs_direct",routingKey,null,("test direct model. this key:[" + routingKey + "]").getBytes());
RabbitMQConnection.closeConnectionAndChannel(channel,connection);
}
}
消费者01
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare("logs_direct","direct");
// 创建一个临时的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和临时队列(绑定多种类型的 routing key)
channel.queueBind(queueName,"logs_direct","error");
channel.queueBind(queueName,"logs_direct","info");
channel.queueBind(queueName,"logs_direct","warning");
// 消费消息
channel.basicConsume(queueName,true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1" + new String(body));
}
});
}
}
注意:下面只绑定了一个 routingKey
消费者02
public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare("logs_direct","direct");
// 创建一个临时的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和临时队列
channel.queueBind(queueName,"logs_direct","info");
// 消费消息
channel.basicConsume(queueName,true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-2" + new String(body));
}
});
}
}
路由模型-直连(Topics)
动态路由模型,就是上面那种 Direct 模型的改进版,使之支持统配符了
因为 Routing Key 一般都是由一个或多个词组成,多个词之间使用 .
进行分隔,所以下面的通配符就是基于这个进行匹配
*
(star) 代替一个词#
(hash) 代替多个词
生产者
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_topic","topic");
// 发送消息
String routingKey = "info";
channel.basicPublish("logs_topic",routingKey,null,("test topic model. this key:[" + routingKey + "]").getBytes());
RabbitMQConnection.closeConnectionAndChannel(channel,connection);
}
}
消费者
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare("logs_topic","topic");
// 创建一个临时的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和临时队列(绑定多种类型的 routing key)
channel.queueBind(queueName,"logs_topic","*");
// 消费消息
channel.basicConsume(queueName,true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1" + new String(body));
}
});
}
}